package cn.doitedu.rtmk.demo.demo4;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;

import java.io.IOException;
import java.util.Map;

@Slf4j
public class ParamJsonUtil {

    public static RuleModelCalculator generateCalculator(String paramJson, RuntimeContext runtimeContext) throws IOException {
        JSONObject jsonObject = JSON.parseObject(paramJson);

        String ruleModelId = jsonObject.getString("ruleModelId");
        String ruleId = jsonObject.getString("ruleId");

        // 如果是生产级代码，这里应该对规则参数进行各类合法性校验
        if(StringUtils.isBlank(ruleModelId) || StringUtils.isBlank(ruleId)) return null;


        // 判断本次收到的参数，属于哪一种规则模型
        if("rule_model_001".equals(ruleModelId)){
            // 构造模型1的运算机
            RuleModel001Calculator ruleModel001Calculator = new RuleModel001Calculator();
            // 初始化该运算机
            ruleModel001Calculator.init(paramJson, runtimeContext);
            // 将初始化好的运算机，放入广播状态
            //broadcastState.put(ruleId,ruleModel001Calculator);

                return ruleModel001Calculator;
        }else {
            // 构造模型2的运算机
            RuleModel002Calculator ruleModel002Calculator = new RuleModel002Calculator();
            // 初始化该运算机
            ruleModel002Calculator.init(paramJson, runtimeContext);
            // 将初始化好的运算机，放入广播状态
            //broadcastState.put(ruleId,ruleModel002Calculator);
            return ruleModel002Calculator;
        }
    }


    /**
     * 从广播状态的规则json中，恢复各个运算机
     * @param broadcastState
     * @param calculatorMap
     * @param runtimeContext
     * @throws Exception
     */
    public static void restoreCalculators(ReadOnlyBroadcastState<String, String> broadcastState,
                                    Map<String,RuleModelCalculator> calculatorMap,
                                    RuntimeContext runtimeContext) throws Exception {
        Iterable<Map.Entry<String, String>> entries = broadcastState.immutableEntries();
        // 如果参数状态不为null
        // 则遍历参数状态中的每一个参数json
        for (Map.Entry<String, String> entry : entries) {
            // 根据参数json构造并初始化运算机对象
            RuleModelCalculator calculator = ParamJsonUtil.generateCalculator(entry.getValue(), runtimeContext);
            // 将运算机加入全局列表
            if (calculator != null) {
                calculatorMap.put(entry.getKey(), calculator);
                log.warn("恢复了一个运算机,其规则ID为:{}",entry.getKey());
            }
        }
    }


}
